Conversation
Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>
54ec2e0 to
fe5d007
Compare
| @@ -56,6 +57,66 @@ async def _handle_task_operation(self, task_operation: TaskOperation) -> None: | |||
| except Exception: | |||
| await self.storage.update_task(task_operation['params']['id'], state='failed') | |||
There was a problem hiding this comment.
🔴 Exception handler bypasses new update_task method, leaving stream subscribers hanging forever
When run_task or cancel_task raises an exception in _handle_task_operation, the except block at line 58 calls self.storage.update_task(...) directly instead of the new self.update_task(...) method introduced in this PR. The new self.update_task() method (fasta2a/worker.py:60-118) is documented as "the primary method workers should use to update task state" because it both persists the update to storage AND publishes streaming events to the broker via send_stream_event. By calling self.storage.update_task() directly, no TaskStatusUpdateEvent(final=True) is ever published to subscribers. Any active subscribe_to_stream iterator for that task will block indefinitely waiting for a final event, causing the SSE connection to hang.
| await self.storage.update_task(task_operation['params']['id'], state='failed') | |
| await self.update_task(task_operation['params']['id'], state='failed') |
Was this helpful? React with 👍 or 👎 to provide feedback.
| broker_params['metadata'] = metadata | ||
|
|
||
| # Start task execution in background | ||
| asyncio.create_task(self.broker.run_task(broker_params)) |
There was a problem hiding this comment.
🔴 Fire-and-forget asyncio.create_task loses exceptions and task reference
At fasta2a/task_manager.py:194, asyncio.create_task(self.broker.run_task(broker_params)) creates a background task but discards the returned Task object. If run_task raises an exception, Python emits a "Task exception was never retrieved" warning and the error is silently lost — the streaming client will never know the task failed to even start. The task object is also subject to garbage collection since no reference is held, which could cause the task to be cancelled unexpectedly (per Python docs: "the event loop only keeps weak references to tasks").
Prompt for agents
In fasta2a/task_manager.py at line 194 in the stream_message method, the asyncio.create_task result is discarded. Store the task reference and add error handling. One approach: store the task in a local variable, and after the subscribe_to_stream loop completes, check if the background task raised an exception (e.g. via task.result() in a try/except). Alternatively, restructure to use a task group (e.g. anyio.create_task_group) to properly manage the background task's lifecycle and propagate exceptions.
Was this helpful? React with 👍 or 👎 to provide feedback.
| # Parse activated extensions from the A2A-Extensions header | ||
| extensions_header = request.headers.get('a2a-extensions', '') | ||
| activated_extensions: list[str] = ( | ||
| [uri.strip() for uri in extensions_header.split(',') if uri.strip()] if extensions_header else [] | ||
| ) | ||
| # Stash on the request state so workers / handlers can inspect them | ||
| request.state.activated_extensions = activated_extensions |
There was a problem hiding this comment.
🚩 activated_extensions parsed but never consumed by downstream code
At fasta2a/applications.py:144-150, the A2A-Extensions header is parsed and stored on request.state.activated_extensions with a comment saying "so workers / handlers can inspect them." However, this state is never passed to TaskManager, Broker, or Worker — those components receive only TaskSendParams which doesn't include activated extensions information. The request.state is scoped to the HTTP request handler and is not accessible from workers. This appears to be scaffolding for future use rather than a bug, but it means extensions have no functional effect currently.
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
Pull request overview
Adds A2A extensions support and introduces an SSE-based message/stream implementation, including broker pub/sub and worker-driven stream event publishing.
Changes:
- Extend agent-card capabilities to advertise supported
extensionsand defaultstreamingcapability. - Implement
message/streamend-to-end: TaskManager streaming generator, broker stream pub/sub, worker event publishing, and SSE HTTP response. - Add/lock new dependencies (
sse-starlette, plus dev deps) and introduce streaming-focused test coverage.
Reviewed changes
Copilot reviewed 10 out of 12 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
fasta2a/schema.py |
Adds extensions to agent capabilities and defines StreamEvent union/type adapter. |
fasta2a/applications.py |
Adds extension header parsing, streaming toggle, and SSE message/stream endpoint. |
fasta2a/task_manager.py |
Implements stream_message() as an async generator yielding stream events. |
fasta2a/broker.py |
Adds streaming event send/subscribe APIs and an in-memory pub/sub implementation. |
fasta2a/worker.py |
Adds Worker.update_task() helper to persist updates and publish stream events. |
fasta2a/__init__.py |
Exports AgentExtension and StreamEvent. |
fasta2a/storage.py |
Import formatting only (no behavioral change). |
tests/test_streaming.py |
New test suite covering broker streaming, worker event publishing, TaskManager streaming, and SSE endpoint. |
tests/test_applications.py |
Updates agent-card snapshot to expect streaming enabled. |
pyproject.toml |
Adds runtime sse-starlette and dev deps used by streaming/tests. |
uv.lock |
Locks newly added dependencies/markers. |
.gitignore |
Ignores coverage artifacts (.coverage*). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Start task execution in background | ||
| asyncio.create_task(self.broker.run_task(broker_params)) | ||
|
|
There was a problem hiding this comment.
The background asyncio.create_task(self.broker.run_task(...)) isn't tracked or awaited. If it raises, you'll get "Task exception was never retrieved" and the SSE generator may keep streaming indefinitely. Consider capturing the task and handling exceptions (e.g., add a done callback that logs and/or publishes a failed final status) and cancel it if the client disconnects.
| if a2a_request['method'] == 'message/send': | ||
| jsonrpc_response = await self.task_manager.send_message(a2a_request) | ||
| elif a2a_request['method'] == 'message/stream': | ||
| stream_request = stream_message_request_ta.validate_json(data) | ||
|
|
||
| async def sse_generator(): | ||
| request_id = stream_request.get('id') | ||
| async for event in self.task_manager.stream_message(stream_request): | ||
| jsonrpc_response = StreamMessageResponse( | ||
| jsonrpc='2.0', | ||
| id=request_id, | ||
| result=event, | ||
| ) | ||
| yield stream_message_response_ta.dump_json(jsonrpc_response, by_alias=True).decode() | ||
|
|
||
| return EventSourceResponse(sse_generator()) |
There was a problem hiding this comment.
FastA2A advertises streaming capability via AgentCapabilities.streaming, but _agent_run_endpoint will still accept and serve message/stream even when self.streaming is False. Add a guard that returns an appropriate JSON-RPC error (e.g., MethodNotFound/UnsupportedOperation) when streaming is disabled so behavior matches the advertised capabilities.
| 'capabilities': { | ||
| 'streaming': False, | ||
| 'streaming': True, | ||
| 'pushNotifications': False, |
There was a problem hiding this comment.
The PR metadata focuses on "extensions", but this change also flips the default advertised streaming capability to True (and adds a new message/stream SSE endpoint). If this broader behavior change is intentional, it should be reflected in the PR title/description (or split into a separate PR) to avoid surprising consumers.
| except Exception: | ||
| await self.storage.update_task(task_operation['params']['id'], state='failed') |
There was a problem hiding this comment.
In the exception handler, the worker updates storage directly, which bypasses the new Worker.update_task() logic and therefore won't publish a final status-update event to stream subscribers on failures. Use Worker.update_task(..., state='failed') here (and consider capturing/logging the exception) so streaming clients reliably see task termination.
| async with self._subscriber_lock: | ||
| subscribers = self._event_subscribers.get(task_id, []) | ||
| if not subscribers: | ||
| return | ||
|
|
||
| # Send event to all subscribers, removing closed streams | ||
| active_subscribers: list[MemoryObjectSendStream[StreamEvent]] = [] | ||
| for stream in subscribers: | ||
| try: | ||
| await stream.send(event) | ||
| active_subscribers.append(stream) | ||
| except (anyio.ClosedResourceError, anyio.BrokenResourceError): | ||
| # Subscriber disconnected, remove from list | ||
| pass | ||
|
|
||
| # Update subscriber list with only active ones | ||
| if active_subscribers: | ||
| self._event_subscribers[task_id] = active_subscribers | ||
| elif task_id in self._event_subscribers: | ||
| # No active subscribers left, clean up |
There was a problem hiding this comment.
send_stream_event() holds _subscriber_lock while awaiting stream.send(...). If a subscriber is slow (buffer full) this blocks all subscription changes and event sends across all tasks. Consider copying the current subscriber list under the lock, releasing it before awaiting sends, then reacquiring to prune/commit active subscribers.
| async with self._subscriber_lock: | |
| subscribers = self._event_subscribers.get(task_id, []) | |
| if not subscribers: | |
| return | |
| # Send event to all subscribers, removing closed streams | |
| active_subscribers: list[MemoryObjectSendStream[StreamEvent]] = [] | |
| for stream in subscribers: | |
| try: | |
| await stream.send(event) | |
| active_subscribers.append(stream) | |
| except (anyio.ClosedResourceError, anyio.BrokenResourceError): | |
| # Subscriber disconnected, remove from list | |
| pass | |
| # Update subscriber list with only active ones | |
| if active_subscribers: | |
| self._event_subscribers[task_id] = active_subscribers | |
| elif task_id in self._event_subscribers: | |
| # No active subscribers left, clean up | |
| # Take a snapshot of current subscribers under the lock. | |
| async with self._subscriber_lock: | |
| subscribers = list(self._event_subscribers.get(task_id, [])) | |
| if not subscribers: | |
| return | |
| # Send event to all subscribers without holding the lock, tracking active ones. | |
| active_subscribers: list[MemoryObjectSendStream[StreamEvent]] = [] | |
| for stream in subscribers: | |
| try: | |
| await stream.send(event) | |
| active_subscribers.append(stream) | |
| except (anyio.ClosedResourceError, anyio.BrokenResourceError): | |
| # Subscriber disconnected, remove from list on reconciliation. | |
| pass | |
| # Reacquire the lock to reconcile the active subscribers with the current list. | |
| async with self._subscriber_lock: | |
| current = self._event_subscribers.get(task_id) | |
| if current is None: | |
| # Task already cleaned up or no subscribers remain. | |
| return | |
| if not active_subscribers: | |
| # No active subscribers left, clean up if still present. | |
| if task_id in self._event_subscribers: | |
| del self._event_subscribers[task_id] | |
| return | |
| # Preserve only streams that are still active and still present in the current list. | |
| active_set = set(active_subscribers) | |
| new_current = [stream for stream in current if stream in active_set] | |
| if new_current: | |
| self._event_subscribers[task_id] = new_current | |
| elif task_id in self._event_subscribers: | |
| # No active subscribers left, clean up. |
| # Start task execution in background | ||
| asyncio.create_task(self.broker.run_task(broker_params)) | ||
|
|
||
| # Stream events from broker | ||
| async for event in self.broker.subscribe_to_stream(task['id']): |
There was a problem hiding this comment.
stream_message() can miss early stream events: it yields the initial task, then schedules broker.run_task(), and only afterwards starts iterating subscribe_to_stream(). If the worker publishes status updates quickly, they can be sent before the subscription is registered and will be dropped. Restructure so the subscription is registered before starting task execution (and ideally before yielding the first event), or change the broker API so subscription registration happens eagerly when subscribe_to_stream() is called.
| # Start task execution in background | |
| asyncio.create_task(self.broker.run_task(broker_params)) | |
| # Stream events from broker | |
| async for event in self.broker.subscribe_to_stream(task['id']): | |
| # Register subscription to the broker's event stream before starting task execution | |
| stream = self.broker.subscribe_to_stream(task['id']) | |
| # Start task execution in background | |
| asyncio.create_task(self.broker.run_task(broker_params)) | |
| # Stream events from broker | |
| async for event in stream: |
Support extensions https://a2a-protocol.org/latest/topics/extensions/